/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.util;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.apache.phoenix.query.BaseTest.generateUniqueName;
import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_NAME;
import static org.apache.phoenix.schema.LiteralTTLExpression.TTL_EXPRESSION_DEFINED_IN_TABLE_DESCRIPTOR;
import static org.apache.phoenix.schema.LiteralTTLExpression.TTL_EXPRESSION_FOREVER;
import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.CoprocessorDescriptor;
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.AggregationManager;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.JoinCompiler;
import org.apache.phoenix.compile.JoinCompiler.JoinTable;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
import org.apache.phoenix.compile.SubqueryRewriter;
import org.apache.phoenix.compile.SubselectRewriter;
import org.apache.phoenix.coprocessor.CompactionScanner;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.AndExpression;
import org.apache.phoenix.expression.ByteBasedLikeExpression;
import org.apache.phoenix.expression.ComparisonExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.InListExpression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.NotExpression;
import org.apache.phoenix.expression.OrExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.StringBasedLikeExpression;
import org.apache.phoenix.expression.aggregator.ClientAggregators;
import org.apache.phoenix.expression.function.SingleAggregateFunction;
import org.apache.phoenix.expression.function.SubstrFunction;
import org.apache.phoenix.expression.function.SumAggregateFunction;
import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.MultiKeyValueComparisonFilter;
import org.apache.phoenix.filter.RowKeyComparisonFilter;
import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.SingleKeyValueComparisonFilter;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.LikeParseNode.LikeType;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PLongColumn;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TTLExpression;
import org.apache.phoenix.schema.TTLExpressionFactory;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;

public class TestUtil {
  private static final Logger LOGGER = LoggerFactory.getLogger(TestUtil.class);

  private static final Long ZERO = new Long(0);
  public static final String DEFAULT_SCHEMA_NAME = "S";
  public static final String DEFAULT_DATA_TABLE_NAME = "T";
  public static final String DEFAULT_INDEX_TABLE_NAME = "I";
  public static final String DEFAULT_DATA_TABLE_FULL_NAME =
    SchemaUtil.getTableName(DEFAULT_SCHEMA_NAME, "T");
  public static final String DEFAULT_INDEX_TABLE_FULL_NAME =
    SchemaUtil.getTableName(DEFAULT_SCHEMA_NAME, "I");

  public static final String TEST_TABLE_SCHEMA = "(" + "   varchar_pk VARCHAR NOT NULL, "
    + "   char_pk CHAR(10) NOT NULL, " + "   int_pk INTEGER NOT NULL, "
    + "   long_pk BIGINT NOT NULL, " + "   decimal_pk DECIMAL(31, 10) NOT NULL, "
    + "   date_pk DATE NOT NULL, " + "   a.varchar_col1 VARCHAR, " + "   a.char_col1 CHAR(10), "
    + "   a.int_col1 INTEGER, " + "   a.long_col1 BIGINT, " + "   a.decimal_col1 DECIMAL(31, 10), "
    + "   a.date1 DATE, " + "   b.varchar_col2 VARCHAR, " + "   b.char_col2 CHAR(10), "
    + "   b.int_col2 INTEGER, " + "   b.long_col2 BIGINT, " + "   b.decimal_col2 DECIMAL(31, 10), "
    + "   b.date2 DATE "
    + "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) ";

  private TestUtil() {
  }

  public static final String CF_NAME = "a";
  public static final byte[] CF = Bytes.toBytes(CF_NAME);

  public static final String CF2_NAME = "b";

  public final static String A_VALUE = "a";
  public final static byte[] A = Bytes.toBytes(A_VALUE);
  public final static String B_VALUE = "b";
  public final static byte[] B = Bytes.toBytes(B_VALUE);
  public final static String C_VALUE = "c";
  public final static byte[] C = Bytes.toBytes(C_VALUE);
  public final static String D_VALUE = "d";
  public final static byte[] D = Bytes.toBytes(D_VALUE);
  public final static String E_VALUE = "e";
  public final static byte[] E = Bytes.toBytes(E_VALUE);

  public final static String ROW1 = "00A123122312312";
  public final static String ROW2 = "00A223122312312";
  public final static String ROW3 = "00A323122312312";
  public final static String ROW4 = "00A423122312312";
  public final static String ROW5 = "00B523122312312";
  public final static String ROW6 = "00B623122312312";
  public final static String ROW7 = "00B723122312312";
  public final static String ROW8 = "00B823122312312";
  public final static String ROW9 = "00C923122312312";

  public final static String PARENTID1 = "0500x0000000001";
  public final static String PARENTID2 = "0500x0000000002";
  public final static String PARENTID3 = "0500x0000000003";
  public final static String PARENTID4 = "0500x0000000004";
  public final static String PARENTID5 = "0500x0000000005";
  public final static String PARENTID6 = "0500x0000000006";
  public final static String PARENTID7 = "0500x0000000007";
  public final static String PARENTID8 = "0500x0000000008";
  public final static String PARENTID9 = "0500x0000000009";

  public final static List<String> PARENTIDS = Lists.newArrayList(PARENTID1, PARENTID2, PARENTID3,
    PARENTID4, PARENTID5, PARENTID6, PARENTID7, PARENTID8, PARENTID9);

  public final static String ENTITYHISTID1 = "017x00000000001";
  public final static String ENTITYHISTID2 = "017x00000000002";
  public final static String ENTITYHISTID3 = "017x00000000003";
  public final static String ENTITYHISTID4 = "017x00000000004";
  public final static String ENTITYHISTID5 = "017x00000000005";
  public final static String ENTITYHISTID6 = "017x00000000006";
  public final static String ENTITYHISTID7 = "017x00000000007";
  public final static String ENTITYHISTID8 = "017x00000000008";
  public final static String ENTITYHISTID9 = "017x00000000009";

  public final static List<String> ENTITYHISTIDS =
    Lists.newArrayList(ENTITYHISTID1, ENTITYHISTID2, ENTITYHISTID3, ENTITYHISTID4, ENTITYHISTID5,
      ENTITYHISTID6, ENTITYHISTID7, ENTITYHISTID8, ENTITYHISTID9);

  public static final String LOCALHOST = "localhost";
  public static final String PHOENIX_JDBC_URL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST
    + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
  public static final String PHOENIX_CONNECTIONLESS_JDBC_URL =
    JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR
      + PHOENIX_TEST_DRIVER_URL_PARAM;

  public static final String TEST_SCHEMA_FILE_NAME = "config" + File.separator + "test-schema.xml";
  public static final String CED_SCHEMA_FILE_NAME = "config" + File.separator + "schema.xml";
  public static final String ENTITY_HISTORY_TABLE_NAME = "ENTITY_HISTORY";
  public static final String ENTITY_HISTORY_SALTED_TABLE_NAME = "ENTITY_HISTORY_SALTED";
  public static final String ATABLE_NAME = "ATABLE";
  public static final String TABLE_WITH_ARRAY = "TABLE_WITH_ARRAY";
  public static final String SUM_DOUBLE_NAME = "SumDoubleTest";
  public static final String ATABLE_SCHEMA_NAME = "";
  public static final String BTABLE_NAME = "BTABLE";
  public static final String STABLE_NAME = "STABLE";
  public static final String STABLE_PK_NAME = "ID";
  public static final String STABLE_SCHEMA_NAME = "";
  public static final String CUSTOM_ENTITY_DATA_FULL_NAME = "CORE.CUSTOM_ENTITY_DATA";
  public static final String CUSTOM_ENTITY_DATA_NAME = "CUSTOM_ENTITY_DATA";
  public static final String CUSTOM_ENTITY_DATA_SCHEMA_NAME = "CORE";
  public static final String HBASE_NATIVE = "HBASE_NATIVE";
  public static final String HBASE_NATIVE_SCHEMA_NAME = "";
  public static final String HBASE_DYNAMIC_COLUMNS = "HBASE_DYNAMIC_COLUMNS";
  public static final String HBASE_DYNAMIC_COLUMNS_SCHEMA_NAME = "";
  public static final String PRODUCT_METRICS_NAME = "PRODUCT_METRICS";
  public static final String PTSDB_NAME = "PTSDB";
  public static final String PTSDB2_NAME = "PTSDB2";
  public static final String PTSDB3_NAME = "PTSDB3";
  public static final String PTSDB_SCHEMA_NAME = "";
  public static final String FUNKY_NAME = "FUNKY_NAMES";
  public static final String MULTI_CF_NAME = "MULTI_CF";
  public static final String MDTEST_NAME = "MDTEST";
  public static final String MDTEST_SCHEMA_NAME = "";
  public static final String KEYONLY_NAME = "KEYONLY";
  public static final String TABLE_WITH_SALTING = "TABLE_WITH_SALTING";
  public static final String INDEX_DATA_SCHEMA = "INDEX_TEST";
  public static final String INDEX_DATA_TABLE = "INDEX_DATA_TABLE";
  public static final String MUTABLE_INDEX_DATA_TABLE = "MUTABLE_INDEX_DATA_TABLE";
  public static final String TRANSACTIONAL_DATA_TABLE = "TRANSACTIONAL_DATA_TABLE";
  public static final String JOIN_SCHEMA = "Join";
  public static final String JOIN_ORDER_TABLE = "OrderTable";
  public static final String JOIN_CUSTOMER_TABLE = "CustomerTable";
  public static final String JOIN_ITEM_TABLE = "ItemTable";
  public static final String JOIN_SUPPLIER_TABLE = "SupplierTable";
  public static final String JOIN_COITEM_TABLE = "CoitemTable";
  public static final String JOIN_ORDER_TABLE_FULL_NAME =
    '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"';
  public static final String JOIN_CUSTOMER_TABLE_FULL_NAME =
    '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"';
  public static final String JOIN_ITEM_TABLE_FULL_NAME =
    '"' + JOIN_SCHEMA + "\".\"" + JOIN_ITEM_TABLE + '"';
  public static final String JOIN_SUPPLIER_TABLE_FULL_NAME =
    '"' + JOIN_SCHEMA + "\".\"" + JOIN_SUPPLIER_TABLE + '"';
  public static final String JOIN_COITEM_TABLE_FULL_NAME =
    '"' + JOIN_SCHEMA + "\".\"" + JOIN_COITEM_TABLE + '"';
  public static final String JOIN_ORDER_TABLE_DISPLAY_NAME = JOIN_SCHEMA + "." + JOIN_ORDER_TABLE;
  public static final String JOIN_CUSTOMER_TABLE_DISPLAY_NAME =
    JOIN_SCHEMA + "." + JOIN_CUSTOMER_TABLE;
  public static final String JOIN_ITEM_TABLE_DISPLAY_NAME = JOIN_SCHEMA + "." + JOIN_ITEM_TABLE;
  public static final String JOIN_SUPPLIER_TABLE_DISPLAY_NAME =
    JOIN_SCHEMA + "." + JOIN_SUPPLIER_TABLE;
  public static final String JOIN_COITEM_TABLE_DISPLAY_NAME = JOIN_SCHEMA + "." + JOIN_COITEM_TABLE;
  public static final String BINARY_NAME = "BinaryTable";

  /**
   * Read-only properties used by all tests
   */
  public static final Properties TEST_PROPERTIES = new Properties() {
    @Override
    public String put(Object key, Object value) {
      throw new UnsupportedOperationException();
    }

    @Override
    public void clear() {
      throw new UnsupportedOperationException();
    }

    @Override
    public Object remove(Object key) {
      throw new UnsupportedOperationException();
    }
  };

  public static byte[][] getSplits(String tenantId) {
    return new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes(tenantId + "00A"),
      Bytes.toBytes(tenantId + "00B"), Bytes.toBytes(tenantId + "00C"), };
  }

  public static void assertRoundEquals(BigDecimal bd1, BigDecimal bd2) {
    bd1 = bd1.round(PDataType.DEFAULT_MATH_CONTEXT);
    bd2 = bd2.round(PDataType.DEFAULT_MATH_CONTEXT);
    if (bd1.compareTo(bd2) != 0) {
      fail("expected:<" + bd1 + "> but was:<" + bd2 + ">");
    }
  }

  public static BigDecimal computeAverage(double sum, long count) {
    return BigDecimal.valueOf(sum).divide(BigDecimal.valueOf(count),
      PDataType.DEFAULT_MATH_CONTEXT);
  }

  public static BigDecimal computeAverage(long sum, long count) {
    return BigDecimal.valueOf(sum).divide(BigDecimal.valueOf(count),
      PDataType.DEFAULT_MATH_CONTEXT);
  }

  public static Expression constantComparison(CompareOperator op, PColumn c, Object o) {
    return new ComparisonExpression(
      Arrays.<Expression> asList(new KeyValueColumnExpression(c), LiteralExpression.newConstant(o)),
      op);
  }

  public static Expression kvColumn(PColumn c) {
    return new KeyValueColumnExpression(c);
  }

  public static Expression pkColumn(PColumn c, List<PColumn> columns) {
    return new RowKeyColumnExpression(c, new RowKeyValueAccessor(columns, columns.indexOf(c)));
  }

  public static Expression constantComparison(CompareOperator op, Expression e, Object o) {
    return new ComparisonExpression(Arrays.asList(e, LiteralExpression.newConstant(o)), op);
  }

  private static boolean useByteBasedRegex(StatementContext context) {
    return context.getConnection().getQueryServices().getProps().getBoolean(
      QueryServices.USE_BYTE_BASED_REGEX_ATTRIB, QueryServicesOptions.DEFAULT_USE_BYTE_BASED_REGEX);
  }

  public static Expression like(Expression e, Object o, StatementContext context) {
    return useByteBasedRegex(context)
      ? ByteBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)),
        LikeType.CASE_SENSITIVE)
      : StringBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)),
        LikeType.CASE_SENSITIVE);
  }

  public static Expression ilike(Expression e, Object o, StatementContext context) {
    return useByteBasedRegex(context)
      ? ByteBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)),
        LikeType.CASE_INSENSITIVE)
      : StringBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)),
        LikeType.CASE_INSENSITIVE);
  }

  public static Expression substr(Expression e, Object offset, Object length) {
    return new SubstrFunction(Arrays.asList(e, LiteralExpression.newConstant(offset),
      LiteralExpression.newConstant(length)));
  }

  public static Expression substr2(Expression e, Object offset) {

    return new SubstrFunction(
      Arrays.asList(e, LiteralExpression.newConstant(offset), LiteralExpression.newConstant(null)));
  }

  public static Expression columnComparison(CompareOperator op, Expression c1, Expression c2) {
    return new ComparisonExpression(Arrays.<Expression> asList(c1, c2), op);
  }

  public static SingleKeyValueComparisonFilter singleKVFilter(Expression e) {
    return new SingleCQKeyValueComparisonFilter(e);
  }

  public static RowKeyComparisonFilter rowKeyFilter(Expression e) {
    return new RowKeyComparisonFilter(e, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
  }

  public static MultiKeyValueComparisonFilter multiKVFilter(Expression e) {
    return new MultiCQKeyValueComparisonFilter(e, false, ByteUtil.EMPTY_BYTE_ARRAY);
  }

  public static MultiEncodedCQKeyValueComparisonFilter multiEncodedKVFilter(Expression e,
    QualifierEncodingScheme encodingScheme) {
    return new MultiEncodedCQKeyValueComparisonFilter(e, encodingScheme, false, null);
  }

  public static Expression and(Expression... expressions) {
    return new AndExpression(Arrays.asList(expressions));
  }

  public static Expression not(Expression expression) {
    return new NotExpression(expression);
  }

  public static Expression or(Expression... expressions) {
    return new OrExpression(Arrays.asList(expressions));
  }

  public static Expression in(Expression... expressions) throws SQLException {
    return InListExpression.create(Arrays.asList(expressions), false, new ImmutableBytesWritable(),
      true);
  }

  public static Expression in(Expression e, Object... literals) throws SQLException {
    PDataType childType = e.getDataType();
    List<Expression> expressions = new ArrayList<Expression>(literals.length + 1);
    expressions.add(e);
    for (Object o : literals) {
      expressions.add(LiteralExpression.newConstant(o, childType));
    }
    return InListExpression.create(expressions, false, new ImmutableBytesWritable(), true);
  }

  public static void assertDegenerate(StatementContext context) {
    Scan scan = context.getScan();
    assertDegenerate(scan);
  }

  public static void assertDegenerate(Scan scan) {
    assertNull(scan.getFilter());
    assertArrayEquals(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStartRow());
    assertArrayEquals(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStopRow());
    assertEquals(null, scan.getFilter());
  }

  public static void assertNotDegenerate(Scan scan) {
    assertFalse(Bytes.compareTo(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStartRow()) == 0
      && Bytes.compareTo(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStopRow()) == 0);
  }

  public static void assertEmptyScanKey(Scan scan) {
    assertNull(scan.getFilter());
    assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, scan.getStartRow());
    assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, scan.getStopRow());
    assertEquals(null, scan.getFilter());
  }

  /**
   * Does a deep comparison of two Results, down to the byte arrays.
   * @param res1 first result to compare
   * @param res2 second result to compare
   * @throws Exception Every difference is throwing an exception
   */
  public static void compareTuples(Tuple res1, Tuple res2) throws Exception {
    if (res2 == null) {
      throw new Exception("There wasn't enough rows, we stopped at " + res1);
    }
    if (res1.size() != res2.size()) {
      throw new Exception("This row doesn't have the same number of KVs: " + res1.toString()
        + " compared to " + res2.toString());
    }
    for (int i = 0; i < res1.size(); i++) {
      Cell ourKV = res1.getValue(i);
      Cell replicatedKV = res2.getValue(i);
      if (!ourKV.equals(replicatedKV)) {
        throw new Exception(
          "This result was different: " + res1.toString() + " compared to " + res2.toString());
      }
    }
  }

  public static void clearMetaDataCache(Connection conn) throws Throwable {
    PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
    Table htable =
      pconn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
    htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
      HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearCacheResponse>() {
        @Override
        public ClearCacheResponse call(MetaDataService instance) throws IOException {
          ServerRpcController controller = new ServerRpcController();
          BlockingRpcCallback<ClearCacheResponse> rpcCallback =
            new BlockingRpcCallback<ClearCacheResponse>();
          ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
          instance.clearCache(controller, builder.build(), rpcCallback);
          if (controller.getFailedOn() != null) {
            throw controller.getFailedOn();
          }
          return rpcCallback.get();
        }
      });
  }

  public static void closeStatement(Statement stmt) {
    try {
      stmt.close();
    } catch (Throwable ignore) {
    }
  }

  public static void closeConnection(Connection conn) {
    try {
      conn.close();
    } catch (Throwable ignore) {
    }
  }

  public static void closeStmtAndConn(Statement stmt, Connection conn) {
    closeStatement(stmt);
    closeConnection(conn);
  }

  public static void bindParams(PhoenixPreparedStatement stmt, List<Object> binds)
    throws SQLException {
    for (int i = 0; i < binds.size(); i++) {
      stmt.setObject(i + 1, binds.get(i));
    }
  }

  /**
   * @param conn      connection to be used
   * @param sortOrder sort order of column contain input values
   * @param id        id of the row being inserted
   * @param input     input to be inserted
   */
  public static void upsertRow(Connection conn, String tableName, String sortOrder, int id,
    Object input) throws SQLException {
    String dml = String.format("UPSERT INTO " + tableName + "_%s VALUES(?,?)", sortOrder);
    PreparedStatement stmt = conn.prepareStatement(dml);
    stmt.setInt(1, id);
    if (input instanceof String) stmt.setString(2, (String) input);
    else if (input instanceof Integer) stmt.setInt(2, (Integer) input);
    else if (input instanceof Double) stmt.setDouble(2, (Double) input);
    else if (input instanceof Float) stmt.setFloat(2, (Float) input);
    else if (input instanceof Boolean) stmt.setBoolean(2, (Boolean) input);
    else if (input instanceof Long) stmt.setLong(2, (Long) input);
    else throw new UnsupportedOperationException(
      "" + input.getClass() + " is not supported by upsertRow");
    stmt.execute();
    conn.commit();
  }

  public static void createGroupByTestTable(Connection conn, String tableName) throws SQLException {
    conn.createStatement().execute("create table " + tableName
      + "   (id varchar not null primary key,\n" + "    uri varchar, appcpu integer)");
  }

  private static void createTable(Connection conn, String inputSqlType, String tableName,
    String sortOrder) throws SQLException {
    String dmlFormat = "CREATE TABLE " + tableName + "_%s (id INTEGER NOT NULL, pk %s NOT NULL, "
      + "kv %s " + "CONSTRAINT PK_CONSTRAINT PRIMARY KEY (id, pk %s))";
    String ddl = String.format(dmlFormat, sortOrder, inputSqlType, inputSqlType, sortOrder);
    conn.createStatement().execute(ddl);
    conn.commit();
  }

  /**
   * Creates a table to be used for testing. It contains one id column, one varchar column to be
   * used as input, and one column which will contain null values
   * @param conn         connection to be used
   * @param inputSqlType sql type of input
   * @param inputList    list of values to be inserted into the pk column
   */
  public static String initTables(Connection conn, String inputSqlType, List<Object> inputList)
    throws Exception {
    String tableName = generateUniqueName();
    createTable(conn, inputSqlType, tableName, "ASC");
    createTable(conn, inputSqlType, tableName, "DESC");
    for (int i = 0; i < inputList.size(); ++i) {
      upsertRow(conn, tableName, "ASC", i, inputList.get(i));
      upsertRow(conn, tableName, "DESC", i, inputList.get(i));
    }
    return tableName;
  }

  public static List<KeyRange> getAllSplits(Connection conn, String tableName) throws SQLException {
    return getSplits(conn, tableName, null, null, null, null, null);
  }

  public static List<KeyRange> getAllSplits(Connection conn, String tableName, String where,
    String selectClause) throws SQLException {
    return getSplits(conn, tableName, null, null, null, where, selectClause);
  }

  public static List<KeyRange> getSplits(Connection conn, String tableName, String pkCol,
    byte[] lowerRange, byte[] upperRange, String whereClauseSuffix, String selectClause)
    throws SQLException {
    String whereClauseStart = (lowerRange == null && upperRange == null
      ? ""
      : " WHERE "
        + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "")
          + (upperRange != null ? (pkCol + " < ?") : "")));
    String whereClause = whereClauseSuffix == null ? whereClauseStart
      : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix)
      : (" AND " + whereClauseSuffix);
    String query = "SELECT /*+ NO_INDEX */ " + selectClause + " FROM " + tableName + whereClause;
    PhoenixPreparedStatement pstmt =
      conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class);
    if (lowerRange != null) {
      pstmt.setBytes(1, lowerRange);
    }
    if (upperRange != null) {
      pstmt.setBytes(lowerRange != null ? 2 : 1, upperRange);
    }
    pstmt.execute();
    List<KeyRange> keyRanges = pstmt.getQueryPlan().getSplits();
    return keyRanges;
  }

  public static Collection<GuidePostsInfo> getGuidePostsList(Connection conn, String tableName)
    throws SQLException {
    return getGuidePostsList(conn, tableName, null, null, null, null);
  }

  public static Collection<GuidePostsInfo> getGuidePostsList(Connection conn, String tableName,
    String where) throws SQLException {
    return getGuidePostsList(conn, tableName, null, null, null, where);
  }

  public static Collection<GuidePostsInfo> getGuidePostsList(Connection conn, String tableName,
    String pkCol, byte[] lowerRange, byte[] upperRange, String whereClauseSuffix)
    throws SQLException {
    String whereClauseStart = (lowerRange == null && upperRange == null
      ? ""
      : " WHERE "
        + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "")
          + (upperRange != null ? (pkCol + " < ?") : "")));
    String whereClause = whereClauseSuffix == null ? whereClauseStart
      : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix)
      : (" AND " + whereClauseSuffix);
    String query = "SELECT /*+ NO_INDEX */ COUNT(*) FROM " + tableName + whereClause;
    PhoenixPreparedStatement pstmt =
      conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class);
    if (lowerRange != null) {
      pstmt.setBytes(1, lowerRange);
    }
    if (upperRange != null) {
      pstmt.setBytes(lowerRange != null ? 2 : 1, upperRange);
    }
    pstmt.execute();
    TableRef tableRef = pstmt.getQueryPlan().getTableRef();
    PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
    PTable table = tableRef.getTable();
    GuidePostsInfo info = pconn.getQueryServices().getTableStats(
      new GuidePostsKey(table.getName().getBytes(), SchemaUtil.getEmptyColumnFamily(table)));
    return Collections.singletonList(info);
  }

  public static void analyzeTable(Connection conn, String tableName)
    throws IOException, SQLException {
    analyzeTable(conn, tableName, false);
  }

  public static void analyzeTable(Connection conn, String tableName, boolean transactional)
    throws IOException, SQLException {
    String query = "UPDATE STATISTICS " + tableName;
    conn.createStatement().execute(query);
    // if the table is transactional burn a txn in order to make sure the next txn read pointer is
    // close to wall clock time
    conn.commit();
  }

  public static void analyzeTableIndex(Connection conn, String tableName)
    throws IOException, SQLException {
    String query = "UPDATE STATISTICS " + tableName + " INDEX";
    conn.createStatement().execute(query);
  }

  public static void analyzeTableColumns(Connection conn, String tableName)
    throws IOException, SQLException {
    String query = "UPDATE STATISTICS " + tableName + " COLUMNS";
    conn.createStatement().execute(query);
  }

  public static void analyzeTable(String url, Properties props, String tableName)
    throws IOException, SQLException {
    Connection conn = DriverManager.getConnection(url, props);
    analyzeTable(conn, tableName);
    conn.close();
  }

  public static void setRowKeyColumns(PreparedStatement stmt, int i) throws SQLException {
    // insert row
    stmt.setString(1, "varchar" + String.valueOf(i));
    stmt.setString(2, "char" + String.valueOf(i));
    stmt.setInt(3, i);
    stmt.setLong(4, i);
    stmt.setBigDecimal(5, new BigDecimal(i * 0.5d));
    Date date =
      new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * MILLIS_IN_DAY);
    stmt.setDate(6, date);
  }

  public static void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
    assertTrue(rs.next());
    assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
    assertEquals(rs.getString(2), "char" + String.valueOf(i));
    assertEquals(rs.getInt(3), i);
    assertEquals(rs.getInt(4), i);
    assertEquals(rs.getBigDecimal(5), new BigDecimal(i * 0.5d));
    Date date =
      new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * MILLIS_IN_DAY);
    assertEquals(rs.getDate(6), date);
  }

  public static ClientAggregators getSingleSumAggregator(String url, Properties props)
    throws SQLException {
    try (PhoenixConnection pconn =
      DriverManager.getConnection(url, props).unwrap(PhoenixConnection.class)) {
      PhoenixStatement statement = new PhoenixStatement(pconn);
      StatementContext context =
        new StatementContext(statement, null, new Scan(), new SequenceManager(statement));
      AggregationManager aggregationManager = context.getAggregationManager();
      SumAggregateFunction func = new SumAggregateFunction(
        Arrays.<Expression> asList(new KeyValueColumnExpression(new PLongColumn() {
          @Override
          public PName getName() {
            return SINGLE_COLUMN_NAME;
          }

          @Override
          public PName getFamilyName() {
            return SINGLE_COLUMN_FAMILY_NAME;
          }

          @Override
          public int getPosition() {
            return 0;
          }

          @Override
          public SortOrder getSortOrder() {
            return SortOrder.getDefault();
          }

          @Override
          public Integer getArraySize() {
            return 0;
          }

          @Override
          public byte[] getViewConstant() {
            return null;
          }

          @Override
          public boolean isViewReferenced() {
            return false;
          }

          @Override
          public String getExpressionStr() {
            return null;
          }

          @Override
          public long getTimestamp() {
            return HConstants.LATEST_TIMESTAMP;
          }

          @Override
          public boolean isDerived() {
            return false;
          }

          @Override
          public boolean isExcluded() {
            return false;
          }

          @Override
          public boolean isRowTimestamp() {
            return false;
          }

          @Override
          public boolean isDynamic() {
            return false;
          }

          @Override
          public byte[] getColumnQualifierBytes() {
            return SINGLE_COLUMN_NAME.getBytes();
          }
        })), null);
      aggregationManager.setAggregators(
        new ClientAggregators(Collections.<SingleAggregateFunction> singletonList(func), 1));
      ClientAggregators aggregators = aggregationManager.getAggregators();
      return aggregators;
    }
  }

  public static void createMultiCFTestTable(Connection conn, String tableName, String options)
    throws SQLException {
    String ddl =
      "create table if not exists " + tableName + "(" + "   varchar_pk VARCHAR NOT NULL, "
        + "   char_pk CHAR(5) NOT NULL, " + "   int_pk INTEGER NOT NULL, "
        + "   long_pk BIGINT NOT NULL, " + "   decimal_pk DECIMAL(31, 10) NOT NULL, "
        + "   a.varchar_col1 VARCHAR, " + "   a.char_col1 CHAR(5), " + "   a.int_col1 INTEGER, "
        + "   a.long_col1 BIGINT, " + "   a.decimal_col1 DECIMAL(31, 10), "
        + "   b.varchar_col2 VARCHAR, " + "   b.char_col2 CHAR(5), " + "   b.int_col2 INTEGER, "
        + "   b.long_col2 BIGINT, " + "   b.decimal_col2 DECIMAL, " + "   b.date_col DATE "
        + "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk)) "
        + (options != null ? options : "");
    conn.createStatement().execute(ddl);
  }

  public static void flush(HBaseTestingUtility utility, TableName table) throws IOException {
    Admin admin = utility.getAdmin();
    admin.flush(table);
  }

  public static void minorCompact(HBaseTestingUtility utility, TableName table)
    throws IOException, InterruptedException {
    try {
      CompactionScanner.setForceMinorCompaction(true);
      Admin admin = utility.getAdmin();
      admin.compact(table);
      int waitForCompactionToCompleteCounter = 0;
      while (CompactionScanner.getForceMinorCompaction()) {
        waitForCompactionToCompleteCounter++;
        if (waitForCompactionToCompleteCounter > 20) {
          Assert.fail();
        }
        Thread.sleep(1000);
      }
    } finally {
      CompactionScanner.setForceMinorCompaction(false);
    }
  }

  public static void majorCompact(HBaseTestingUtility utility, TableName table)
    throws IOException, InterruptedException {
    long compactionRequestedSCN = EnvironmentEdgeManager.currentTimeMillis();
    Admin admin = utility.getAdmin();
    admin.majorCompact(table);
    long lastCompactionTimestamp;
    CompactionState state = null;
    CompactionState previousState = null;
    while (
      (state = admin.getCompactionState(table)).equals(CompactionState.MAJOR)
        || state.equals(CompactionState.MAJOR_AND_MINOR)
        || (lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table))
            < compactionRequestedSCN
    ) {
      // In HBase 2.5 getLastMajorCompactionTimestamp doesn't seem to get updated when the
      // clock is stopped, so check for the state going to NONE instead
      if (
        state.equals(CompactionState.NONE)
          && (previousState != null && (previousState.equals(CompactionState.MAJOR_AND_MINOR)
            || previousState.equals(CompactionState.MAJOR)))
      ) {
        break;
      }
      previousState = state;
      Thread.sleep(100);
    }
  }

  /**
   * Runs a major compaction, and then waits until the compaction is complete before returning.
   * @param tableName name of the table to be compacted
   */
  public static void doMajorCompaction(Connection conn, String tableName) throws Exception {

    tableName = SchemaUtil.normalizeIdentifier(tableName);

    // We simply write a marker row, request a major compaction, and then wait until the marker
    // row is gone
    PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
    PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), tableName));
    ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
    MutationState mutationState = pconn.getMutationState();
    if (table.isTransactional()) {
      mutationState.startTransaction(table.getTransactionProvider());
    }
    try (Table htable = mutationState.getHTable(table)) {
      byte[] markerRowKey = Bytes.toBytes("TO_DELETE");

      Put put = new Put(markerRowKey);
      long timestamp = 0L;
      // We do not want to wait an hour because of PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY
      // So set the timestamp of the put and delete as early as possible
      put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
        QueryConstants.EMPTY_COLUMN_VALUE_BYTES, timestamp,
        QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
      htable.put(put);
      Delete delete = new Delete(markerRowKey);
      delete.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
        QueryConstants.EMPTY_COLUMN_VALUE_BYTES, timestamp);
      htable.delete(delete);
      htable.close();
      if (table.isTransactional()) {
        mutationState.commit();
      }

      Admin hbaseAdmin = services.getAdmin();
      hbaseAdmin.flush(TableName.valueOf(tableName));
      hbaseAdmin.majorCompact(TableName.valueOf(tableName));
      hbaseAdmin.close();

      boolean compactionDone = false;
      while (!compactionDone) {
        Thread.sleep(6000L);
        Scan scan = new Scan();
        scan.withStartRow(markerRowKey);
        scan.withStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
        scan.setRaw(true);

        try (Table htableForRawScan = services.getTable(Bytes.toBytes(tableName))) {
          ResultScanner scanner = htableForRawScan.getScanner(scan);
          List<Result> results = Lists.newArrayList(scanner);
          LOGGER.info("Results: " + results);
          compactionDone = results.isEmpty();
          scanner.close();
        }
        LOGGER.info("Compaction done: " + compactionDone);

        // need to run compaction after the next txn snapshot has been written so that compaction
        // can remove deleted rows
        if (!compactionDone && table.isTransactional()) {
          hbaseAdmin = services.getAdmin();
          hbaseAdmin.flush(TableName.valueOf(tableName));
          hbaseAdmin.majorCompact(TableName.valueOf(tableName));
          hbaseAdmin.close();
        }
      }
    }
  }

  public static void createTransactionalTable(Connection conn, String tableName)
    throws SQLException {
    createTransactionalTable(conn, tableName, "");
  }

  public static void createTransactionalTable(Connection conn, String tableName, String extraProps)
    throws SQLException {
    conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA
      + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps)));
  }

  public static void dumpTable(Connection conn, TableName tableName)
    throws SQLException, IOException {
    ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
    Table table = cqs.getTable(tableName.getName());
    dumpTable(table);
  }

  public static void dumpTable(Table table) throws IOException {
    System.out.println("************ dumping " + table + " **************");
    Scan s = new Scan();
    s.setRaw(true);
    s.readAllVersions();
    int cellCount = 0;
    int rowCount = 0;
    try (ResultScanner scanner = table.getScanner(s)) {
      Result result;
      while ((result = scanner.next()) != null) {
        rowCount++;
        System.out.println("Row count: " + rowCount);
        CellScanner cellScanner = result.cellScanner();
        Cell current;
        while (cellScanner.advance()) {
          current = cellScanner.current();
          System.out
            .println(current + " column= " + Bytes.toStringBinary(CellUtil.cloneQualifier(current))
              + " val=" + Bytes.toStringBinary(CellUtil.cloneValue(current)));
          cellCount++;
        }
      }
    }
    System.out.println("----- Row count: " + rowCount + " Cell count: " + cellCount + " -----");
  }

  public static int getRawRowCount(Table table) throws IOException {
    return getRowCount(table, true);
  }

  public static int getRowCount(Table table, boolean isRaw) throws IOException {
    Scan s = new Scan();
    s.setRaw(isRaw);
    int rows = 0;
    try (ResultScanner scanner = table.getScanner(s)) {
      while (scanner.next() != null) {
        rows++;
      }
    }
    return rows;
  }

  public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
    Scan s = new Scan();
    s.setRaw(isRaw);
    s.readAllVersions();

    CellCount cellCount = new CellCount();
    try (ResultScanner scanner = table.getScanner(s)) {
      Result result = null;
      while ((result = scanner.next()) != null) {
        CellScanner cellScanner = result.cellScanner();
        Cell current = null;
        while (cellScanner.advance()) {
          current = cellScanner.current();
          cellCount.addOrUpdateCell(Bytes.toString(CellUtil.cloneRow(current)));
        }
      }
    }
    return cellCount;
  }

  public static class CellCount {
    private Map<String, Integer> rowCountMap = new HashMap<String, Integer>();

    public void addOrUpdateCell(String key) {
      addOrUpdateCells(key, 1);
    }

    public void addOrUpdateCells(String key, int count) {
      if (rowCountMap.containsKey(key)) {
        rowCountMap.put(key, rowCountMap.get(key) + count);
      } else {
        insertRow(key, count);
      }
    }

    public void insertRow(String key, int count) {
      rowCountMap.put(key, count);
    }

    public void removeRow(String key) {
      rowCountMap.remove(key);
    }

    public int getCellCount(String key) {
      if (rowCountMap.containsKey(key)) {
        return rowCountMap.get(key);
      } else {
        return 0;
      }
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }
      CellCount other = (CellCount) o;
      return rowCountMap.equals(other.rowCountMap);
    }

    @Override
    public String toString() {
      final StringBuilder sb = new StringBuilder("CellCount{");
      sb.append("rowCountMap=").append(rowCountMap);
      sb.append('}');
      return sb.toString();
    }
  }

  public static void dumpIndexStatus(Connection conn, String indexName)
    throws IOException, SQLException {
    try (Table table = conn.unwrap(PhoenixConnection.class).getQueryServices()
      .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
      System.out.println("************ dumping index status for " + indexName + " **************");
      Scan s = new Scan();
      s.setRaw(true);
      s.readAllVersions();
      byte[] startRow = SchemaUtil.getTableKeyFromFullName(indexName);
      s.withStartRow(startRow);
      s.withStopRow(
        ByteUtil.nextKey(ByteUtil.concat(startRow, QueryConstants.SEPARATOR_BYTE_ARRAY)));
      try (ResultScanner scanner = table.getScanner(s)) {
        Result result = null;
        while ((result = scanner.next()) != null) {
          CellScanner cellScanner = result.cellScanner();
          Cell current = null;
          while (cellScanner.advance()) {
            current = cellScanner.current();
            if (
              Bytes.compareTo(current.getQualifierArray(), current.getQualifierOffset(),
                current.getQualifierLength(), PhoenixDatabaseMetaData.INDEX_STATE_BYTES, 0,
                PhoenixDatabaseMetaData.INDEX_STATE_BYTES.length) == 0
            ) {
              System.out.println(current.getTimestamp() + "/INDEX_STATE=" + PIndexState
                .fromSerializedValue(current.getValueArray()[current.getValueOffset()]));
            }
          }
        }
      }
      System.out.println("-----------------------------------------------");
    }
  }

  public static void printResultSet(ResultSet rs) throws SQLException {
    while (rs.next()) {
      printResult(rs, false);
    }
  }

  public static void printResult(ResultSet rs, boolean multiLine) throws SQLException {
    StringBuilder builder = new StringBuilder();
    int columnCount = rs.getMetaData().getColumnCount();
    for (int i = 0; i < columnCount; i++) {
      Object value = rs.getObject(i + 1);
      String output = value == null ? "null" : value.toString();
      builder.append(output);
      if (i + 1 < columnCount) {
        builder.append(",");
        if (multiLine) {
          builder.append("\n");
        }
      }
      System.out.println(builder.toString());
    }
  }

  public static void waitForIndexRebuild(Connection conn, String fullIndexName,
    PIndexState indexState) throws InterruptedException, SQLException {
    waitForIndexState(conn, fullIndexName, indexState, 0L);
  }

  private static class IndexStateCheck {
    public final PIndexState indexState;
    public final Long indexDisableTimestamp;
    public final Boolean success;

    public IndexStateCheck(PIndexState indexState, Long indexDisableTimestamp, Boolean success) {
      this.indexState = indexState;
      this.indexDisableTimestamp = indexDisableTimestamp;
      this.success = success;
    }
  }

  public static void waitForIndexState(Connection conn, String fullIndexName,
    PIndexState expectedIndexState) throws InterruptedException, SQLException {
    int maxTries = 120, nTries = 0;
    PIndexState actualIndexState = null;
    do {
      String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
      String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
      Thread.sleep(1000); // sleep 1 sec
      String query = "SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + " FROM "
        + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE ("
        + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME + ") = ("
        + "'" + schema + "','" + index + "') " + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY
        + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
      ResultSet rs = conn.createStatement().executeQuery(query);
      if (rs.next()) {
        actualIndexState = PIndexState.fromSerializedValue(rs.getString(1));
        boolean matchesExpected = (actualIndexState == expectedIndexState);
        if (matchesExpected) {
          return;
        }
      }
    } while (++nTries < maxTries);
    fail("Ran out of time waiting for index state to become " + expectedIndexState
      + " last seen actual state is "
      + (actualIndexState == null ? "Unknown" : actualIndexState.toString()));
  }

  public static void waitForIndexState(Connection conn, String fullIndexName,
    PIndexState expectedIndexState, Long expectedIndexDisableTimestamp)
    throws InterruptedException, SQLException {
    int maxTries = 60, nTries = 0;
    do {
      Thread.sleep(1000); // sleep 1 sec
      IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, expectedIndexState,
        expectedIndexDisableTimestamp);
      if (state.success != null) {
        if (Boolean.TRUE.equals(state.success)) {
          return;
        }
        fail("Index state will not become " + expectedIndexState);
      }
    } while (++nTries < maxTries);
    fail("Ran out of time waiting for index state to become " + expectedIndexState);
  }

  public static boolean checkIndexState(Connection conn, String fullIndexName,
    PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
    return Boolean.TRUE.equals(checkIndexStateInternal(conn, fullIndexName, expectedIndexState,
      expectedIndexDisableTimestamp).success);
  }

  public static void assertIndexState(Connection conn, String fullIndexName,
    PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
    IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, expectedIndexState,
      expectedIndexDisableTimestamp);
    if (!Boolean.TRUE.equals(state.success)) {
      if (expectedIndexState != null) {
        assertEquals(expectedIndexState, state.indexState);
      }
      if (expectedIndexDisableTimestamp != null) {
        assertEquals(expectedIndexDisableTimestamp, state.indexDisableTimestamp);
      }
    }
  }

  public static PIndexState getIndexState(Connection conn, String fullIndexName)
    throws SQLException {
    IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, null, null);
    return state.indexState;
  }

  public static long getPendingDisableCount(PhoenixConnection conn, String indexTableName) {
    byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
    Get get = new Get(indexTableKey);
    get.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);

    try {
      Result pendingDisableCountResult = conn.getQueryServices()
        .getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
          conn.getQueryServices().getProps()).getName())
        .get(get);
      return Bytes.toLong(pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES,
        PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES));
    } catch (Exception e) {
      LOGGER.error("Exception in getPendingDisableCount: " + e);
      return 0;
    }
  }

  private static IndexStateCheck checkIndexStateInternal(Connection conn, String fullIndexName,
    PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
    String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
    String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
    String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT),"
      + PhoenixDatabaseMetaData.INDEX_STATE + " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME
      + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
      + ") = (" + "'" + schema + "','" + index + "') " + "AND "
      + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND "
      + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
    ResultSet rs = conn.createStatement().executeQuery(query);
    Long actualIndexDisableTimestamp = null;
    PIndexState actualIndexState = null;
    if (rs.next()) {
      actualIndexDisableTimestamp = rs.getLong(1);
      actualIndexState = PIndexState.fromSerializedValue(rs.getString(2));
      boolean matchesExpected = (expectedIndexDisableTimestamp == null
        || Objects.equal(actualIndexDisableTimestamp, expectedIndexDisableTimestamp))
        && (expectedIndexState == null || actualIndexState == expectedIndexState);
      if (matchesExpected) {
        return new IndexStateCheck(actualIndexState, actualIndexDisableTimestamp, Boolean.TRUE);
      }
      if (ZERO.equals(actualIndexDisableTimestamp)) {
        return new IndexStateCheck(actualIndexState, actualIndexDisableTimestamp, Boolean.FALSE);
      }
    }
    return new IndexStateCheck(actualIndexState, actualIndexDisableTimestamp, null);
  }

  public static long getRowCount(Connection conn, String tableName) throws SQLException {
    ResultSet rs =
      conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName);
    assertTrue(rs.next());
    return rs.getLong(1);
  }

  public static long getRowCount(Connection conn, String tableName, boolean skipIndex)
    throws SQLException {
    String query =
      String.format("SELECT %s count(*) FROM %s", (skipIndex ? "/*+ NO_INDEX */" : ""), tableName);
    try (ResultSet rs = conn.createStatement().executeQuery(query)) {
      assertTrue(rs.next());
      return rs.getLong(1);
    }
  }

  public static long getRowCountFromIndex(Connection conn, String tableName, String indexName)
    throws SQLException {
    String query = String.format("SELECT count(*) FROM %s", tableName);
    try (ResultSet rs = conn.createStatement().executeQuery(query)) {
      PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
      String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
      assertTrue(explainPlan.contains(indexName));
      assertTrue(rs.next());
      return rs.getLong(1);
    }
  }

  public static void addCoprocessor(Connection conn, String tableName, Class coprocessorClass)
    throws Exception {
    int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
    ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
    TableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
    TableDescriptorBuilder descriptorBuilder = null;
    if (
      !descriptor.getCoprocessorDescriptors().stream().map(CoprocessorDescriptor::getClassName)
        .collect(Collectors.toList()).contains(coprocessorClass.getName())
    ) {
      descriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
      descriptorBuilder.setCoprocessor(CoprocessorDescriptorBuilder
        .newBuilder(coprocessorClass.getName()).setPriority(priority).build());
    } else {
      return;
    }
    final int retries = 10;
    int numTries = 10;
    descriptor = descriptorBuilder.build();
    try (Admin admin = services.getAdmin()) {
      admin.modifyTable(descriptor);
      while (
        !admin.getDescriptor(TableName.valueOf(tableName)).equals(descriptor) && numTries > 0
      ) {
        numTries--;
        if (numTries == 0) {
          throw new Exception(
            "Failed to add " + coprocessorClass.getName() + " after " + retries + " retries.");
        }
        Thread.sleep(1000);
      }
    }
  }

  public static void removeCoprocessor(Connection conn, String tableName, Class coprocessorClass)
    throws Exception {
    ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
    TableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
    TableDescriptorBuilder descriptorBuilder = null;
    if (
      descriptor.getCoprocessorDescriptors().stream().map(CoprocessorDescriptor::getClassName)
        .collect(Collectors.toList()).contains(coprocessorClass.getName())
    ) {
      descriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
      descriptorBuilder.removeCoprocessor(coprocessorClass.getName());
    } else {
      return;
    }
    final int retries = 10;
    int numTries = retries;
    descriptor = descriptorBuilder.build();
    try (Admin admin = services.getAdmin()) {
      admin.modifyTable(descriptor);
      while (
        !admin.getDescriptor(TableName.valueOf(tableName)).equals(descriptor) && numTries > 0
      ) {
        numTries--;
        if (numTries == 0) {
          throw new Exception(
            "Failed to remove " + coprocessorClass.getName() + " after " + retries + " retries.");
        }
        Thread.sleep(1000);
      }
    }
  }

  public static boolean compare(CompareOperator op, ImmutableBytesWritable lhsOutPtr,
    ImmutableBytesWritable rhsOutPtr) {
    int compareResult = Bytes.compareTo(lhsOutPtr.get(), lhsOutPtr.getOffset(),
      lhsOutPtr.getLength(), rhsOutPtr.get(), rhsOutPtr.getOffset(), rhsOutPtr.getLength());
    return ByteUtil.compare(op, compareResult);
  }

  public static QueryPlan getOptimizeQueryPlan(Connection conn, String sql) throws SQLException {
    PhoenixPreparedStatement statement =
      conn.prepareStatement(sql).unwrap(PhoenixPreparedStatement.class);
    QueryPlan queryPlan = statement.optimizeQuery(sql);
    queryPlan.iterator();
    return queryPlan;
  }

  public static QueryPlan getOptimizeQueryPlanNoIterator(Connection conn, String sql)
    throws SQLException {
    PhoenixPreparedStatement statement =
      conn.prepareStatement(sql).unwrap(PhoenixPreparedStatement.class);
    QueryPlan queryPlan = statement.optimizeQuery(sql);
    return queryPlan;
  }

  public static void assertResultSet(ResultSet rs, Object[][] rows) throws Exception {
    for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
      assertTrue("rowIndex:[" + rowIndex + "] rs.next error!", rs.next());
      for (int columnIndex = 1; columnIndex <= rows[rowIndex].length; columnIndex++) {
        Object realValue = rs.getObject(columnIndex);
        Object expectedValue = rows[rowIndex][columnIndex - 1];
        if (realValue == null) {
          assertNull("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]",
            expectedValue);
        } else {
          assertEquals("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]",
            expectedValue, realValue);
        }
      }
    }
    assertTrue(!rs.next());
  }

  /**
   * Find a random free port in localhost for binding.
   * @return A port number or -1 for failure.
   */
  public static int getRandomPort() {
    try (ServerSocket socket = new ServerSocket(0)) {
      socket.setReuseAddress(true);
      return socket.getLocalPort();
    } catch (IOException e) {
      return -1;
    }
  }

  public static boolean hasFilter(Scan scan, Class<? extends Filter> filterClass) {
    Iterator<Filter> filterIter = ScanUtil.getFilterIterator(scan);
    while (filterIter.hasNext()) {
      Filter filter = filterIter.next();
      if (filterClass.isInstance(filter)) {
        return true;
      }
    }
    return false;
  }

  public static JoinTable getJoinTable(String query, PhoenixConnection connection)
    throws SQLException {
    SQLParser parser = new SQLParser(query);
    SelectStatement select = SubselectRewriter.flatten(parser.parseQuery(), connection);
    ColumnResolver resolver = FromCompiler.getResolverForQuery(select, connection);
    select = StatementNormalizer.normalize(select, resolver);
    SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection);
    if (transformedSelect != select) {
      resolver = FromCompiler.getResolverForQuery(transformedSelect, connection);
      select = StatementNormalizer.normalize(transformedSelect, resolver);
    }
    PhoenixStatement stmt = connection.createStatement().unwrap(PhoenixStatement.class);
    return JoinCompiler.compile(stmt, select, resolver);
  }

  public static void assertSelectStatement(FilterableStatement selectStatement, String sql) {
    assertTrue(selectStatement.toString().trim().equals(sql));
  }

  public static void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
    assertEquals(code.getErrorCode(), se.getErrorCode());
    assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
    assertEquals(code.getSQLState(), se.getSQLState());
  }

  /**
   * In 5.3.0 release the TTL column will be created in SYSTEM.CATALOG and used to store TTL values,
   * but the TTL values for tables with literal expressions will continue to be stored in the HBase
   * TableDescriptor for backward compatibility. Table level TTL values will be moved from the HBase
   * TableDescriptor and stored in SYSTEM.CATALOG in a future release.
   */
  public static void assertTTLValue(Connection conn, TableName tableName, int ttl,
    boolean checkTTLInSystemCatalog) throws SQLException, IOException {
    TTLExpression tableTTL;
    if (checkTTLInSystemCatalog) {
      tableTTL = conn.unwrap(PhoenixConnection.class)
        .getTable(new PTableKey(null, tableName.getNameAsString())).getTTLExpression();
    } else {
      tableTTL = TTLExpressionFactory.create(getColumnDescriptor(conn, tableName).getTimeToLive());
    }
    TTLExpression expectedTTL = TTLExpressionFactory.create(ttl);
    Assert.assertEquals(expectedTTL, tableTTL);
  }

  /**
   * In 5.3.0 release the TTL column will be created in SYSTEM.CATALOG and used to store TTL values,
   * but the TTL values for tables with literal expressions will continue to be stored in the HBase
   * TableDescriptor for backward compatibility. When table type != TABLE (i.e. VIEW TTL) OR
   * expression is ConditionalExpression then actual TTL value is stored in SYSTEM.CATALOG and
   * TableDescriptor will be set to FOREVER When table type == TABLE (i.e. Table TTL) and expression
   * is LiteralExpression then actual TTL value is stored in TableDescriptor and SYSTEM.CATALOG will
   * be set to TTL_EXPRESSION_DEFINED_IN_TABLE_DESCRIPTOR. if expression == TTL_NOT_DEFINED OR
   * FOREVER then TTL_NOT_DEFINED will be stores in SYSTEM.CATALOG
   */

  public static void assertTTLValue(Connection conn, TTLExpression expected, String name)
    throws SQLException, IOException {
    PTable pTable = PhoenixRuntime.getTableNoCache(conn, name);
    assertTTLValue(conn, pTable, expected);
  }

  public static void assertTTLValue(Connection conn, TableName tableName, int ttl)
    throws SQLException, IOException {
    assertTTLValue(conn, tableName.getNameAsString(), TTLExpressionFactory.create(ttl));
  }

  public static void assertTTLValue(Connection conn, String name, TTLExpression expected)
    throws SQLException, IOException {
    PTable pTable = PhoenixRuntime.getTableNoCache(conn, name);
    assertTTLValue(conn, pTable, expected);
  }

  public static void assertTTLValue(Connection conn, PTable pTable, TTLExpression expected)
    throws SQLException, IOException {
    TTLExpression actualInSyscat = pTable.getTTLExpression();
    byte[] hbaseTableName = pTable.getPhysicalName().getBytes();
    ColumnFamilyDescriptor cfd = getColumnDescriptor(conn, TableName.valueOf(hbaseTableName));
    TTLExpression actual = actualInSyscat;
    if (actualInSyscat.equals(TTL_EXPRESSION_DEFINED_IN_TABLE_DESCRIPTOR)) {
      actual = TTLExpressionFactory.create(cfd.getTimeToLive());
    } else {
      assertEquals("Table descriptor TTL value did not match :-", TTL_EXPRESSION_FOREVER,
        TTLExpressionFactory.create(cfd.getTimeToLive()));
    }
    assertEquals("TTL value did not match :-", expected, actual);
  }

  public static void assertTableHasVersions(Connection conn, TableName tableName, int versions)
    throws SQLException, IOException {
    ColumnFamilyDescriptor cd = getColumnDescriptor(conn, tableName);
    Assert.assertEquals(versions, cd.getMaxVersions());
  }

  public static ColumnFamilyDescriptor getColumnDescriptor(Connection conn, TableName tableName)
    throws SQLException, IOException {
    Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
    TableDescriptor td = admin.getDescriptor(tableName);
    return td.getColumnFamilies()[0];
  }

  public static void assertRawRowCount(Connection conn, TableName table, int expectedRowCount)
    throws SQLException, IOException {
    ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
    int count = TestUtil.getRawRowCount(cqs.getTable(table.getName()));
    assertEquals(expectedRowCount, count);
  }

  public static int getRawRowCount(Connection conn, TableName table)
    throws SQLException, IOException {
    ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
    return TestUtil.getRawRowCount(cqs.getTable(table.getName()));
  }

  public static int getRawCellCount(Connection conn, TableName tableName, byte[] row)
    throws SQLException, IOException {
    ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
    Table table = cqs.getTable(tableName.getName());
    CellCount cellCount = getCellCount(table, true);
    return cellCount.getCellCount(Bytes.toString(row));
  }

  public static CellCount getRawCellCount(Connection conn, TableName tableName)
    throws IOException, SQLException {
    ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
    Table table = cqs.getTable(tableName.getName());
    return getCellCount(table, true);
  }

  public static void assertRawCellCount(Connection conn, TableName tableName, byte[] row,
    int expectedCellCount) throws SQLException, IOException {
    int count = getRawCellCount(conn, tableName, row);
    assertEquals(expectedCellCount, count);
  }

  public static void assertRowExistsAtSCN(String url, String sql, long scn, boolean shouldExist)
    throws SQLException {
    boolean rowExists = false;
    Properties props = new Properties();
    ResultSet rs;
    props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
    try (Connection conn = DriverManager.getConnection(url, props)) {
      rs = conn.createStatement().executeQuery(sql);
      rowExists = rs.next();
      if (shouldExist) {
        Assert.assertTrue("Row was not found at time " + scn + " when it should have been",
          rowExists);
      } else {
        Assert.assertFalse("Row was found at time " + scn + " when it should not have been",
          rowExists);
      }
    }

  }

  public static void assertRowHasExpectedValueAtSCN(String url, String sql, long scn, String value)
    throws SQLException {
    Properties props = new Properties();
    ResultSet rs;
    props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
    try (Connection conn = DriverManager.getConnection(url, props)) {
      rs = conn.createStatement().executeQuery(sql);
      Assert.assertTrue("Value " + value + " does not exist at scn " + scn, rs.next());
      Assert.assertEquals(value, rs.getString(1));
    }

  }

  public static String getExplainPlan(Connection conn, String sql) throws SQLException {
    try (ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + sql)) {
      return QueryUtil.getExplainPlan(rs);
    }
  }

  public static Path createTempDirectory() throws IOException {
    // We cannot use java.nio.file.Files.createTempDirectory(null),
    // because that caches the value of "java.io.tmpdir" on class load.
    return Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), null);
  }

  /**
   * Split the table at the provided split point.
   */
  public static void splitTable(Connection conn, String tableName, byte[] splitPoint)
    throws Exception {
    executeHBaseTableRegionOperation(conn, tableName, (admin, regionLocator, nRegions) -> {
      admin.split(TableName.valueOf(tableName), splitPoint);
      waitForRegionChange(regionLocator, nRegions);
    });
  }

  /**
   * Merge the given regions of a table.
   */
  public static void mergeTableRegions(Connection conn, String tableName, List<String> regions)
    throws Exception {
    byte[][] regionsToMerge = regions.stream().map(String::getBytes).toArray(byte[][]::new);

    executeHBaseTableRegionOperation(conn, tableName, (admin, regionLocator, nRegions) -> {
      admin.mergeRegionsAsync(regionsToMerge, true).get();
      waitForRegionChange(regionLocator, nRegions);
    });
  }

  @FunctionalInterface
  private interface TableOperation {
    void execute(Admin admin, RegionLocator regionLocator, int initialRegionCount) throws Exception;
  }

  private static void executeHBaseTableRegionOperation(Connection conn, String tableName,
    TableOperation operation) throws Exception {
    ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
    Configuration configuration = services.getConfiguration();
    try (org.apache.hadoop.hbase.client.Connection hbaseConn =
      ConnectionFactory.createConnection(configuration); Admin admin = services.getAdmin()) {
      RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
      int nRegions = regionLocator.getAllRegionLocations().size();
      operation.execute(admin, regionLocator, nRegions);
    }
  }

  private static void waitForRegionChange(RegionLocator regionLocator, int initialRegionCount)
    throws Exception {
    int retryCount = 0;
    while (retryCount < 20 && regionLocator.getAllRegionLocations().size() == initialRegionCount) {
      Thread.sleep(5000);
      retryCount++;
    }
    Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), initialRegionCount);
  }

  public static List<HRegionLocation> getAllTableRegions(Connection conn, String tableName)
    throws Exception {
    ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
    Configuration configuration = services.getConfiguration();
    RegionLocator regionLocator;
    org.apache.hadoop.hbase.client.Connection hbaseConn =
      ConnectionFactory.createConnection(configuration);
    regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
    return regionLocator.getAllRegionLocations();
  }

  public static String retainSingleQuotes(String input) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < input.length(); ++i) {
      char ch = input.charAt(i);
      sb.append(ch);
      if (ch == '\'') {
        sb.append('\'');
      }
    }
    return sb.toString();
  }

}
